WIP [dbnode] Buffer encoded commit log data and finalize batch resources async#2395
WIP [dbnode] Buffer encoded commit log data and finalize batch resources async#2395robskillington wants to merge 6 commits into
Conversation
| writer := newMockCommitLogWriter() | ||
|
|
||
| writer.writeFn = func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { | ||
| writer.writeFn = func(ts.Series, ts.Datapoint, xtime.Unit, ts.Annotation, callbackFn) error { |
There was a problem hiding this comment.
Oh wow you can do this? I thought you needed to explicitly set the args as _ callbackFn etc
| // double calling callback. | ||
| var callback callbackFn | ||
| if i == lastWritableElem { | ||
| callback = write.callbackFn |
There was a problem hiding this comment.
Would it be useful adding a metric here to see how often this hits?
| numWritesSuccess++ | ||
| } | ||
| if lastWritableElem == -1 && write.callbackFn != nil { | ||
| // Call callback successfully if no elements actually needed to write. |
There was a problem hiding this comment.
Would it be useful adding a metric here to see how often this hits?
| // goroutines since write latency is very important. | ||
| // For | ||
| if numCPU := runtime.NumCPU(); numCPU <= 16 { | ||
| runtime.GOMAXPROCS(runtime.NumCPU() + 1) |
| lastWritableElem := -1 | ||
| for i := numDequeued - 1; i >= 0 && lastWritableElem == -1; i-- { | ||
| if batch[i].Err == nil && !batch[i].SkipWrite { | ||
| lastWritableElem = i |
There was a problem hiding this comment.
nit: can this break instead of having lastWritableElem == -1 in the guard? makes intentions clearer, and one fewer check per iteration is a bonus
| return r.rotateLogs, nil | ||
| } | ||
|
|
||
| func zeroToTenSecondsHighResDurationBuckets() tally.Buckets { |
There was a problem hiding this comment.
This seems like a lot of buckets, any concerns for cardinality / instrumentation perf here?
There was a problem hiding this comment.
Hm actually looks like the other buckets we have is ~60 so this is probably fine
| for i := 1; i <= 10; i++ { | ||
| buckets = append(buckets, time.Duration(i)*time.Second) | ||
| } | ||
| return buckets |
There was a problem hiding this comment.
Do we need a +Inf upper bound value here?
| } | ||
| b.flushCalls <- call | ||
|
|
||
| next := b.idx + 1 |
There was a problem hiding this comment.
nit: b.idx = (b.idx+1)%len(b.writers)? Or is len(b.writers) potentially 0
| pLen := len(p) | ||
| currLen := len(b.writers[b.idx].buffer) | ||
| newLen := currLen + pLen | ||
| if currLen > 0 && newLen > b.singleBufferLength { |
There was a problem hiding this comment.
any chance newLen > b.singleBufferlength * 2? Maybe this needs to be a for loop? Similarly if just len(p) is greater than a single buffer size
| writers []asyncFlushBufferWriter | ||
| total int | ||
| ) | ||
| for total+opts.singleBufferLength <= opts.totalBufferLength { |
There was a problem hiding this comment.
nit; can just use regular for here for total := 0 ; total < opts.totalBufferLength; total += opts.singleBufferLength {?
| if 2*flushSize > flushBufferSize { | ||
| flushBufferSize = 2 * flushSize | ||
| } |
There was a problem hiding this comment.
nit: Can we refactor FlushBufferSize to be something like NumBuffers or something to keep it a multiple of a single buffer size? Then Validate() can ensure FlushBufferSize > 2
834e25e to
99f874f
Compare
e130b7e to
3cf877e
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2395 +/- ##
========================================
- Coverage 71.7% 71.6% -0.1%
========================================
Files 1052 1051 -1
Lines 93115 92958 -157
========================================
- Hits 66797 66613 -184
- Misses 21874 21894 +20
- Partials 4444 4451 +7
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
What this PR does / why we need it:
Allows pending IO to queue up while encode loop for the commit log can continue working.
Also reserves an entire gomaxproc for encode loop to avoid scheduling latency/being pre-empted while in the middle of critical encode loop.
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: